{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Redis Ingestion Pipeline\n",
    "\n",
    "This walkthrough shows how to use Redis for both the vector store, cache, and docstore in an Ingestion Pipeline."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Dependencies\n",
    "\n",
    "Install and start redis, setup OpenAI API key"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Requirement already satisfied: redis in /home/loganm/.cache/pypoetry/virtualenvs/llama-index-4a-wkI5X-py3.11/lib/python3.11/site-packages (5.0.1)\n",
      "Requirement already satisfied: async-timeout>=4.0.2 in /home/loganm/.cache/pypoetry/virtualenvs/llama-index-4a-wkI5X-py3.11/lib/python3.11/site-packages (from redis) (4.0.3)\n",
      "\n",
      "\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m A new release of pip is available: \u001b[0m\u001b[31;49m23.2.1\u001b[0m\u001b[39;49m -> \u001b[0m\u001b[32;49m23.3.1\u001b[0m\n",
      "\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m To update, run: \u001b[0m\u001b[32;49mpip install --upgrade pip\u001b[0m\n"
     ]
    }
   ],
   "source": [
    "!pip install redis"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "338c889086e8649aa80dfb79ebff4fffc98d72fc6d988ac158c6662e9e0cf04b\n"
     ]
    }
   ],
   "source": [
    "!docker run -d --name redis-stack -p 6379:6379 -p 8001:8001 redis/redis-stack:latest"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "\n",
    "os.environ[\"OPENAI_API_KEY\"] = \"sk-...\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create Seed Data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Make some test data\n",
    "!rm -rf test_redis_data\n",
    "!mkdir -p test_redis_data\n",
    "!echo \"This is a test file: one!\" > test_redis_data/test1.txt\n",
    "!echo \"This is a test file: two!\" > test_redis_data/test2.txt"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "/home/loganm/.cache/pypoetry/virtualenvs/llama-index-4a-wkI5X-py3.11/lib/python3.11/site-packages/deeplake/util/check_latest_version.py:32: UserWarning: A newer version of deeplake (3.8.9) is available. It's recommended that you update to the latest version using `pip install -U deeplake`.\n",
      "  warnings.warn(\n"
     ]
    }
   ],
   "source": [
    "from llama_index import SimpleDirectoryReader\n",
    "\n",
    "# load documents with deterministic IDs\n",
    "documents = SimpleDirectoryReader(\n",
    "    \"./test_redis_data\", filename_as_id=True\n",
    ").load_data()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Run the Redis-Based Ingestion Pipeline\n",
    "\n",
    "With a vector store attached, the pipeline will handle upserting data into your vector store.\n",
    "\n",
    "However, if you only want to handle duplcates, you can change the strategy to `DUPLICATES_ONLY`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from llama_index.embeddings import HuggingFaceEmbedding\n",
    "from llama_index.ingestion import (\n",
    "    DocstoreStrategy,\n",
    "    IngestionPipeline,\n",
    "    IngestionCache,\n",
    ")\n",
    "from llama_index.ingestion.cache import RedisCache\n",
    "from llama_index.storage.docstore import RedisDocumentStore\n",
    "from llama_index.text_splitter import SentenceSplitter\n",
    "from llama_index.vector_stores import RedisVectorStore\n",
    "\n",
    "embed_model = HuggingFaceEmbedding(model_name=\"BAAI/bge-small-en-v1.5\")\n",
    "\n",
    "pipeline = IngestionPipeline(\n",
    "    transformations=[\n",
    "        SentenceSplitter(),\n",
    "        embed_model,\n",
    "    ],\n",
    "    docstore=RedisDocumentStore.from_host_and_port(\n",
    "        \"localhost\", 6379, namespace=\"document_store\"\n",
    "    ),\n",
    "    vector_store=RedisVectorStore(\n",
    "        index_name=\"redis_vector_store\",\n",
    "        index_prefix=\"vectore_store\",\n",
    "        redis_url=\"redis://localhost:6379\",\n",
    "    ),\n",
    "    cache=IngestionCache(\n",
    "        cache=RedisCache.from_host_and_port(\"localhost\", 6379),\n",
    "        collection=\"redis_cache\",\n",
    "    ),\n",
    "    docstore_strategy=DocstoreStrategy.UPSERTS,\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Ingested 2 Nodes\n"
     ]
    }
   ],
   "source": [
    "nodes = pipeline.run(documents=documents)\n",
    "print(f\"Ingested {len(nodes)} Nodes\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Confirm documents are ingested\n",
    "\n",
    "We can create a vector index using our vector store, and quickly ask which documents are seen."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from llama_index import VectorStoreIndex, ServiceContext\n",
    "\n",
    "service_context = ServiceContext.from_defaults(embed_model=embed_model)\n",
    "\n",
    "index = VectorStoreIndex.from_vector_store(\n",
    "    pipeline.vector_store, service_context=service_context\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "I see two documents: \"test2.txt\" and \"test1.txt\".\n"
     ]
    }
   ],
   "source": [
    "print(\n",
    "    index.as_query_engine(similarity_top_k=10).query(\n",
    "        \"What documents do you see?\"\n",
    "    )\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Add data and Ingest\n",
    "\n",
    "Here, we can update an existing file, as well as add a new one!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!echo \"This is a test file: three!\" > test_redis_data/test3.txt\n",
    "!echo \"This is a NEW test file: one!\" > test_redis_data/test1.txt"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Ingested 2 Nodes\n"
     ]
    }
   ],
   "source": [
    "documents = SimpleDirectoryReader(\n",
    "    \"./test_redis_data\", filename_as_id=True\n",
    ").load_data()\n",
    "\n",
    "nodes = pipeline.run(documents=documents)\n",
    "\n",
    "print(f\"Ingested {len(nodes)} Nodes\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "I see three documents: test3.txt, test1.txt, and test2.txt.\n",
      "This is a test file: three!\n",
      "This is a NEW test file: one!\n",
      "This is a test file: two!\n"
     ]
    }
   ],
   "source": [
    "index = VectorStoreIndex.from_vector_store(\n",
    "    pipeline.vector_store, service_context=service_context\n",
    ")\n",
    "\n",
    "response = index.as_query_engine(similarity_top_k=10).query(\n",
    "    \"What documents do you see?\"\n",
    ")\n",
    "\n",
    "print(response)\n",
    "\n",
    "for node in response.source_nodes:\n",
    "    print(node.get_text())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "As we can see, the data was deduplicated and upserted correctly! Only three nodes are in the index, even though we ran the full pipeline twice."
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "llama-index-4a-wkI5X-py3.11",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
